package co.recloud.ariadne.server;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;

import co.recloud.ariadne.model.Host;
import co.recloud.ariadne.model.logical.Assignment;
import co.recloud.ariadne.model.logical.Column;
import co.recloud.ariadne.model.logical.Command;
import co.recloud.ariadne.model.logical.Condition;
import co.recloud.ariadne.model.logical.Join;
import co.recloud.ariadne.model.logical.Table;
import co.recloud.ariadne.model.logical.Transaction;
import co.recloud.ariadne.model.plan.FullScan;
import co.recloud.ariadne.model.plan.HashJoin;
import co.recloud.ariadne.model.plan.HashLookup;
import co.recloud.ariadne.model.plan.Link;
import co.recloud.ariadne.model.plan.Operator;
import co.recloud.ariadne.model.plan.Project;
import co.recloud.ariadne.model.plan.Select;
import co.recloud.ariadne.persistor.KeyKeyValuePersistor;
import co.recloud.ariadne.persistor.KeyKeyValuePersistorImpl;
import co.recloud.ariadne.persistor.KeyValuePersistor;
import co.recloud.ariadne.persistor.KeyValuePersistorImpl;
import co.recloud.ariadne.persistor.SchemaPersistor;
import co.recloud.ariadne.persistor.SchemaPersistorImpl;
import co.recloud.ariadne.queue.TransactionQueue;
import co.recloud.ariadne.request.GridUpdateRequest;
import co.recloud.ariadne.request.HostRequest;
import co.recloud.ariadne.request.TransactionStartGossipRequest;
import co.recloud.ariadne.request.TransitionRequest;
import co.recloud.ariadne.response.HostResponse;
import co.recloud.ariadne.store.HostTable;
import co.recloud.ariadne.thread.Main;

/**
 * Serve client requests
 * 
 * @author alex
 * 
 */
public class APIServer {

	private List<Map<String, Object>> currentResultSet;
	private static long nextRsIndex = 1;
	private Transaction transaction = null;
	private boolean userTransaction = false;
	private Set<Host> affectedHosts;

	/**
	 * Handle an incoming command
	 * 
	 * @param localhost
	 * @param port
	 * @param command
	 * @return
	 */
	public int serve(String localhost, int port, Command command) {
		int result = 0;
		try {
			switch (command.getCommandType()) {
			case Command.LINK:
				result = serveLinkCommand(localhost, port, command);
				break;
			case Command.UNLINK:
				result = serveUnlinkCommand(localhost, port, command);
				break;
			case Command.UPDATE:
				result = serveUpdateCommand(localhost, port, command);
				break;
			case Command.SELECT:
				currentResultSet = null;
				result = serveSelectCommand(localhost, port, command);
				break;
			case Command.BEGIN:
				startTransaction();
				userTransaction = true;
				break;
			case Command.COMMIT:
				userTransaction = false;
				commitTransaction();
				break;
			case Command.ABORT:
				userTransaction = false;
				break;
			case Command.ADD_HOST:
				result = serveSeedHost(localhost, port, command);
				break;
			default:
				break;
			}
		} catch (Exception e) {
			e.printStackTrace();
			result = -1;
		}
		return result;
	}

	private int serveSeedHost(String localhost, int port, Command command) {
		HostTable ht = HostTable.getInstance();
		Host host = command.getHost();
		HostRequest hostRequest = new HostRequest();
		hostRequest.setAddedHost(ht.getLocalhost());
		hostRequest.setHostTable(ht);
		hostRequest.setReturnHostTable(true);
		hostRequest.setSeeded(false);
		hostRequest.sendBlocking(host);
		HostResponse response = (HostResponse) hostRequest.getResponse();
		if (response != null) {
			System.out.println(response.getHostTable().getTokenRing());
			ht.merge(response.getHostTable());
			System.out.println(ht.getTokenRing());
			if (ht.expandGrid()) {
				Set<Host> serviceHosts = ht.getHostsInService();
				for (Host coHost : serviceHosts) {
					TransitionRequest transReq = new TransitionRequest();
					transReq.setLocation(coHost.getLocation());
					transReq.setNewStatus(Main.STATUS_SERVICE_PENDING);
					transReq.setSourceHost(ht.getLocalhost());
					transReq.sendBlocking(coHost);
				}
				System.out.println(ht.getLocationToHost());
				for (Host coHost : ht.getTokenRing().values()) {
					System.out.println("sending grid update.... " + coHost);
					GridUpdateRequest gridReq = new GridUpdateRequest();
					gridReq.setHostTable(ht);
					gridReq.setHops(0);
					gridReq.setSourceHost(ht.getLocalhost());
					gridReq.sendBlocking(coHost);
				}
				for (Host coHost : serviceHosts) {
					TransitionRequest transReq = new TransitionRequest();
					transReq.setLocation(coHost.getLocation());
					transReq.setNewStatus(Main.STATUS_SERVICE);
					transReq.setSourceHost(ht.getLocalhost());
					transReq.sendBlocking(coHost);
				}
			}
			return 0;
		} else {
			return 1;
		}

	}

	private void startTransaction() {
		if (transaction == null && !userTransaction) {
			affectedHosts = new HashSet<Host>();
			transaction = new Transaction();
			transaction.setState(Transaction.READY);
			transaction.generate();
			TransactionStartGossipRequest startReq = new TransactionStartGossipRequest();
			startReq.setTransaction(transaction);
			startReq.gossip();
		}
	}

	/**
	 * Find a near-optimal join tree
	 * 
	 * @param tableOps
	 * @param tables
	 * @param tableCanJoin
	 * @param childOf
	 * @param parentOf
	 * @param conditionMap
	 * @return
	 */
	private Operator orderJoins(List<Operator> tableOps, List<String> tables,
			Map<String, Boolean> tableCanJoin,
			Map<String, List<String>> childOf,
			Map<String, List<String>> parentOf,
			Map<String, Map<String, Condition>> conditionMap) {
		int listSize = tables.size();
		float[][][] join = new float[listSize][listSize][3];
		boolean[][] canJoin = new boolean[listSize][listSize];
		Condition[][] conditions = new Condition[listSize][listSize];
		for (int n = 0; n < listSize; n++) {
			for (int i = 0; i < n; i++) {
				join[i][n][0] = -1;
			}
		}
		for (int i = 0; i < listSize; i++) {
			canJoin[i][i] = tableCanJoin.get(tables.get(i));
			join[i][i][1] = i;
		}
		for (int n = 1; n < listSize; n++) {
			for (int i = n - 1; i >= 0; i--) {
				for (int k = i; k < n; k++) {
					float newCard = -1;
					boolean pairCanJoin = false;
					float newK = -1;
					float pOrC = -1;
					String leftTable = null;
					String rightTable = null;
					for (int l = i; l <= k; l++) {
						boolean doBreak = false;
						for (int r = k + 1; r <= n; r++) {
							leftTable = tables.get(l);
							rightTable = tables.get(r);
							if (childOf.containsKey(leftTable)
									&& childOf.get(leftTable).contains(
											rightTable)) {
								newCard = join[i][k][0];
								pairCanJoin = canJoin[i][k];
								newK = k;
								pOrC = 0;
								doBreak = true;

								break;
							} else if (parentOf.containsKey(leftTable)
									&& parentOf.get(leftTable).contains(
											rightTable)) {
								newCard = join[k + 1][n][0];
								pairCanJoin = canJoin[k+1][n];
								newK = k + 1;
								pOrC = 1;
								doBreak = true;
								break;
							}
						}
						if (doBreak) {
							break;
						}
					}
					if (pairCanJoin || join[i][n][0] == -1 || newCard < join[i][n][0]) {
						canJoin[i][n] = pairCanJoin;
						join[i][n][0] = newCard;
						join[i][n][1] = newK;
						join[i][n][2] = pOrC;
						if (pOrC == 0) {
							conditions[i][n] = conditionMap.get(leftTable).get(
									rightTable);
						} else {
							conditions[i][n] = conditionMap.get(rightTable)
									.get(leftTable);
						}
					}
				}
			}
		}
		return buildJoinTree(join, 0, listSize - 1, tables, tableOps,
				conditions);
	}

	/**
	 * Build the join tree
	 * 
	 * @param join
	 * @param start
	 * @param end
	 * @param tables
	 * @param tableOps
	 * @param conditions
	 * @return
	 */
	private Operator buildJoinTree(float[][][] join, int start, int end,
			List<String> tables, List<Operator> tableOps,
			Condition[][] conditions) {
		if (end - start > 0) {
			int nextStart = 0;
			int nextEnd = 0;
			if (join[start][end][2] == 0) {
				nextStart = (int) join[start][end][1] + 1;
				nextEnd = (int) join[start][end][1];
			} else {
				nextStart = (int) join[start][end][1];
				nextEnd = (int) join[start][end][1] - 1;
			}
			HashJoin root = new HashJoin(transaction);
			root.setBaseTables(new HashSet<Table>());
			root.setTable(getNewResultSet());
			root.setCondition(conditions[start][end]);
			root.setChildren(new ArrayList<Operator>());
			root.getChildren().add(
					buildJoinTree(join, start, nextEnd, tables, tableOps,
							conditions));
			root.getChildren().add(
					buildJoinTree(join, nextStart, end, tables, tableOps,
							conditions));
			root.getBaseTables().addAll(root.getLeftChild().getBaseTables());
			root.getBaseTables().addAll(root.getRightChild().getBaseTables());
			root.setTableAliases(root.getLeftChild().getTableAliases());
			if (root.getLeftChild() instanceof HashJoin) {
				((HashJoin) root.getLeftChild())
						.setClusterColumn(conditions[start][end]
								.getLeftColumn());
			}
			if (root.getRightChild() instanceof HashJoin) {
				((HashJoin) root.getRightChild())
						.setClusterColumn(conditions[start][end]
								.getRightColumn());
			}
			return root;
		} else {
			return tableOps.get(start);
		}
	}

	/**
	 * Get a new result set
	 * 
	 * @return
	 */
	private Table getNewResultSet() {
		Table rs = new Table();
		rs.setName("RS" + nextRsIndex);
		rs.setResultSet(true);
		nextRsIndex++;
		return rs;
	}

	/**
	 * Push projects down through the query plan
	 * 
	 * @param project
	 * @param parent
	 * @param child
	 */
	private void pushDownProjects(Project project, Operator parent,
			Operator child) {
		Project newProject = null;
		if (child instanceof HashJoin) {
			newProject = new Project(transaction);
			newProject.setColumns(new HashSet<Column>());
			HashJoin hjChild = (HashJoin) child;
			for (Column column : project.getColumns()) {
				if (child.getBaseTables().contains(column.getTable())) {
					newProject.getColumns().add(column);
				}
			}
			if (!newProject.getColumns().contains(hjChild.getLeftKey())) {
				newProject.getColumns().add(hjChild.getLeftKey());
			}
			if (!newProject.getColumns().contains(hjChild.getRightKey())) {
				newProject.getColumns().add(hjChild.getRightKey());
			}
		}
		if (child instanceof HashLookup) {
			newProject = new Project(transaction);
			newProject.setColumns(new HashSet<Column>());
			for (Column column : project.getColumns()) {
				if (child.getBaseTables().contains(column.getTable())) {
					newProject.getColumns().add(column);
				}
			}
			HashLookup hlChild = (HashLookup) child;
			if (!newProject.getColumns().contains(
					hlChild.getCondition().getLeftColumn())) {
				newProject.getColumns().add(
						hlChild.getCondition().getLeftColumn());
			}
		}
		if (newProject != null) {
			newProject.setBaseTables(child.getBaseTables());
			newProject.setTableAliases(child.getTableAliases());
			newProject.setChildren(new ArrayList<Operator>());
			newProject.getChildren().add(child);
			newProject.setTable(getNewResultSet());
			if (parent != null) {
				parent.getChildren().set(parent.getChildren().indexOf(child),
						newProject);
			}
			if (!(child instanceof HashLookup) && !(child instanceof FullScan)) {
				if (child.getChildren() != null) {
					for (int i = 0; i < child.getChildren().size(); i++) {
						Operator deepChild = child.getChildren().get(i);
						pushDownProjects(newProject, child, deepChild);
					}
				}
			}
		}
	}

	/**
	 * Handle a select
	 * 
	 * @param localhost
	 * @param port
	 * @param command
	 * @return
	 */
	private int serveSelectCommand(String localhost, int port, Command command) {
		startTransaction();
		int result = -1;
		try {
			Project project = new Project(transaction);
			project.setColumns(command.getColumns());
			project.setTable(getNewResultSet());

			if (command.getCondition() != null) {
				Select select = new Select(transaction);
				select.setParent(project);
				project.setChildren(new ArrayList<Operator>());
				project.getChildren().add(select);
			}
			Condition baseCondition = command.getCondition();
			List<Operator> joinedTables = new ArrayList<Operator>();
			List<String> joinedTableNames = new ArrayList<String>();
			Map<String, Boolean> tableCanJoin = new HashMap<String, Boolean>();
			Map<String, List<String>> childOf = new HashMap<String, List<String>>();
			Map<String, List<String>> parentOf = new HashMap<String, List<String>>();
			Map<String, Map<String, Condition>> conditionMap = new HashMap<String, Map<String, Condition>>();
			Map<String, Table> aliasMap = new HashMap<String, Table>();
			Table baseTable = command.getTable();
			Operator baseTableOp = null;
			joinedTableNames.add(baseTable.getAlias());
			aliasMap.put(baseTable.getAlias(), baseTable);
			if (baseCondition.getLeftColumn().getTable().getAlias()
					.equals(baseTable.getAlias())) {
				FullScan childOp = new FullScan(transaction);
				childOp.setTable(baseTable);
				baseTableOp = new HashLookup(transaction);
				baseTableOp.setTable(getNewResultSet());
				((HashLookup) baseTableOp).setCondition(baseCondition);
				baseTableOp.setChildren(new ArrayList<Operator>());
				baseTableOp.getChildren().add(childOp);
				tableCanJoin.put(baseTable.getAlias(), true);
			} else {
				baseTableOp = new FullScan(transaction);
				baseTableOp.setTable(baseTable);
				tableCanJoin.put(baseTable.getAlias(), false);
			}
			baseTableOp.setTableAliases(aliasMap);
			baseTableOp.setBaseTables(new HashSet<Table>());
			baseTableOp.getBaseTables().add(baseTable);
			Join currentJoin = baseTable.getJoin();
			joinedTables.add(baseTableOp);
			while (currentJoin != null) {
				Table joinedTable = currentJoin.getTable();
				Operator joinedTableOp = null;
				joinedTableNames.add(joinedTable.getAlias());
				aliasMap.put(joinedTable.getAlias(), joinedTable);
				if (baseCondition.getLeftColumn().getTable().getAlias()
						.equals(joinedTable.getAlias())) {
					FullScan childOp = new FullScan(transaction);
					childOp.setTable(joinedTable);
					joinedTableOp = new HashLookup(transaction);
					joinedTableOp.setTable(getNewResultSet());
					joinedTableOp.setChildren(new ArrayList<Operator>());
					joinedTableOp.getChildren().add(childOp);
					((HashLookup) joinedTableOp).setCondition(baseCondition);
					tableCanJoin.put(joinedTable.getAlias(), true);
				} else {
					joinedTableOp = new FullScan(transaction);
					joinedTableOp.setTable(joinedTable);
					tableCanJoin.put(joinedTable.getAlias(), false);
				}
				joinedTableOp.setBaseTables(new HashSet<Table>());
				joinedTableOp.getBaseTables().add(joinedTable);
				joinedTableOp.setTableAliases(aliasMap);
				joinedTables.add(joinedTableOp);
				Condition currentCondition = currentJoin.getCondition();
				Table leftTable = currentCondition.getLeftColumn().getTable();
				Table rightTable = currentCondition.getRightColumn().getTable();
				Table childTable = null;
				Table parentTable = null;
				if (currentCondition.getLeftColumn().getField()
						.equals(Table.PRIMARY_KEY)) {
					childTable = rightTable;
					parentTable = leftTable;
				} else {
					childTable = leftTable;
					parentTable = rightTable;
				}
				if (!childOf.containsKey(childTable.getAlias())) {
					childOf.put(childTable.getAlias(), new ArrayList<String>());
				}
				childOf.get(childTable.getAlias()).add(parentTable.getAlias());
				if (!parentOf.containsKey(parentTable.getAlias())) {
					parentOf.put(parentTable.getAlias(),
							new ArrayList<String>());
				}
				if (!conditionMap.containsKey(childTable.getAlias())) {
					conditionMap.put(childTable.getAlias(),
							new HashMap<String, Condition>());
				}
				conditionMap.get(childTable.getAlias()).put(
						parentTable.getAlias(), currentCondition);
				parentOf.get(parentTable.getAlias()).add(childTable.getAlias());
				currentJoin = currentJoin.getJoin();
			}
			baseCondition.getLeftColumn().setTable(
					aliasMap.get(baseCondition.getLeftColumn().getTable()
							.getAlias()));
			renameJoinConditions(baseTable, aliasMap);
			Operator tableRoot = orderJoins(joinedTables, joinedTableNames,
					tableCanJoin, childOf, parentOf, conditionMap);
			renameProjectColumns(project, aliasMap);
			project.setColumns(command.getColumns());
			project.setTableAliases(aliasMap);
			project.setBaseTables(tableRoot.getBaseTables());
			project.setChildren(new ArrayList<Operator>());
			project.getChildren().add(tableRoot);
			pushDownProjects(project, project, tableRoot);
			project.execute(localhost, port);
			currentResultSet = project.dumpTable();
			commitTransaction();
			result = 1;
		} catch (Exception e) {
			e.printStackTrace();
			result = -1;
		}
		return result;
	}

	private void commitTransaction() {
		if (!userTransaction) {
			transaction.setCommitTime(Main.getTime());
            BlockingQueue<Transaction> commitQueue = TransactionQueue.getQueue();
            commitQueue.add(transaction);
			transaction = null;
		}
	}

	/**
	 * Make sure join conditions have resolved tables
	 * 
	 * @param joinedTables
	 */
	private void renameJoinConditions(Table baseTable,
			Map<String, Table> tableAliases) {
		Join currentJoin = baseTable.getJoin();
		while (currentJoin != null) {
			Condition condition = currentJoin.getCondition();
			condition.getLeftColumn().setTable(
					tableAliases.get(condition.getLeftColumn().getTable()
							.getAlias()));
			condition.getRightColumn().setTable(
					tableAliases.get(condition.getRightColumn().getTable()
							.getAlias()));
			currentJoin = currentJoin.getJoin();
		}
	}

	/**
	 * Rename aliased tables in the project columns
	 * 
	 * @param project
	 * @param aliasMap
	 */
	private void renameProjectColumns(Project project,
			Map<String, Table> aliasMap) {
		Set<Column> columns = new HashSet<Column>();
		for (Column projectColumn : project.getColumns()) {
			String projectName = projectColumn.getTable().getName();
			if (aliasMap.containsKey(projectName)) {
				projectColumn.setTable(aliasMap.get(projectName));
			}
		}
		columns.addAll(project.getColumns());
		project.setColumns(columns);

	}

	/**
	 * Handle an update
	 * 
	 * @param localhost
	 * @param port
	 * @param command
	 * @return
	 */
	private int serveUpdateCommand(String localhost, int port, Command command) {
		startTransaction();
		SchemaPersistor schemaPersistor = new SchemaPersistorImpl(transaction);
		schemaPersistor.setLocalHostName(localhost);
		String key = command.getCondition().getRightValue();
		Map<String, Table> parentTables = new HashMap<String, Table>();
		Table table = command.getTable();
		for (Assignment assignment : command.getAssignments()) {
			String field = assignment.getColumn().getField();
			Table parentTable = schemaPersistor.getParentTable(assignment
					.getColumn());
			if (parentTable != null) {
				parentTables.put(field, parentTable);
			}
		}
		KeyValuePersistor kvPersistor = new KeyValuePersistorImpl(transaction);
		KeyKeyValuePersistor kkvPersistor = new KeyKeyValuePersistorImpl(
				transaction);

		for (Assignment assignment : command.getAssignments()) {
			String field = assignment.getColumn().getField();
			String value = assignment.getValue();
			Column childColumn = Column.parseColumn(field, table, null);
			kvPersistor.setValue(key, childColumn, value);
			if (parentTables.containsKey(field)) {
				Column parentPK = Column.parseColumn(Table.PRIMARY_KEY,
						parentTables.get(field), null);
				String parentKey = value;
				String oldParentKey = kvPersistor.getValue(parentKey,
						childColumn);
				if (oldParentKey != null && !oldParentKey.equals(parentKey)) {
					kkvPersistor.deleteParentToChild(parentPK, oldParentKey,
							childColumn, key);
				}
				kkvPersistor.setParentToChild(parentPK, parentKey, childColumn,
						key);
			}
		}
		commitTransaction();
		return 0;
	}

	private int serveLinkCommand(String localhost, int port, Command command) {
		startTransaction();
		Link link = new Link(transaction);
		link.setTable(command.getTable());
		link.setBaseColumn(command.getColumns().iterator().next());
		link.execute(localhost, port);
		commitTransaction();
		return 0;
	}

	private int serveUnlinkCommand(String localhost, int port, Command command) {
		startTransaction();
		SchemaPersistor schemaPersistor = new SchemaPersistorImpl(transaction);
		schemaPersistor.setLocalHostName(localhost);
		Column baseColumn = command.getColumns().iterator().next();
		schemaPersistor.unlink(baseColumn, command.getTable());
		commitTransaction();
		return 0;
	}

	public List<Map<String, Object>> getResultSet() {
		return currentResultSet;
	}

	public Transaction getTransaction() {
		return transaction;
	}

	/**
	 * @param affectedHosts
	 *            the affectedHosts to set
	 */
	public void setAffectedHosts(Set<Host> affectedHosts) {
		this.affectedHosts = affectedHosts;
	}

	/**
	 * @return the affectedHosts
	 */
	public Set<Host> getAffectedHosts() {
		return affectedHosts;
	}
}
